package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.factory.DaoFactory;
import cn.com.bonc.tool.RuleDAO;
import cn.com.bonc.util.ColumnsUtil;
import cn.com.bonc.util.StructStreamingUtil;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.io.IOException;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.split;


/**
 * Created by ${RQL} on 2019/1/8
 */
public class StructuredstreamingTest {
    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession.builder()
                .appName("StructuredStreamingFile")
                .master("local[4]")
                .getOrCreate();

        //从目录里面读取
        //text、csv、json、parquet四种形式
        //text格式如下
        Dataset<Row> lines = spark
                .readStream()
                .option("latestFirst", "false")//是否先处理最新的新文件，当存在大量积压文件时有用（默认值：false）；
                .option("fileNameOnly", "false")//是否基于以下方法检查新文件只有文件名而不是完整路径（默认值：false）,设为ture将出现很多文件被视为同一文件；
                //.text("/user/rql_test");//是hdfs目录
                .text(ConfigurationManager.getProperty(Constants.HDFS_SOURCE_PATH));

        lines.printSchema();
            //过滤不规则数据
        lines.filter((FilterFunction<Row>) x -> StructStreamingUtil.isStandardData(x.toString()));


     /*   Dataset<Row> rowDataset = lines
                .selectExpr("CAST(value AS STRING)")
                .as(Encoders.STRING())
                .filter((FilterFunction<String>) x -> ColumnsUtil.isFormalData(x))
                .withColumn("tmp", split(col("value"), "[|]"))
                .select(columnsUtil.getColumns("tmp"))
                .drop(col("temp"));*/
        Dataset<Row> rowDataset = lines
                .select(col("value").cast("string"))
                .withColumn("tmp", split(col("value"), "[|]"))
                .select(ColumnsUtil.getInstance().getColumns("tmp"))
                .drop(col("tmp"));

        rowDataset.printSchema();

        RuleDAO ruleDAO1 = DaoFactory.getUpdateDAO(1);
        RuleDAO ruleDAO2 = DaoFactory.getUpdateDAO(2);
        RuleDAO ruleDAO3 = DaoFactory.getUpdateDAO(3);
        RuleDAO ruleDAO4 = DaoFactory.getUpdateDAO(4);
        RuleDAO ruleDAO5 = DaoFactory.getUpdateDAO(5);



        Dataset<Row> dataset1 = ruleDAO1.formulate(rowDataset);
        Dataset<Row> dataset2 = ruleDAO2.formulate(dataset1);
        Dataset<Row> dataset3 = ruleDAO3.formulate(dataset2);
        Dataset<Row> dataset4 = ruleDAO4.formulate(dataset3);
        Dataset<Row> dataset5 = ruleDAO5.formulate(dataset4);


        /*String windowDuration = ConfigurationManager.getProperty(Constants.WINDOW_DURATION) + " seconds";
        String slideDuration = ConfigurationManager.getProperty(Constants.SLIDE_DURATION) + " seconds";

        // Group the data by window and word and compute the count of each group
        Dataset<Row> windowedCounts = dataset5.groupBy(
                functions.window(dataset5.col("timestamp"),windowDuration,slideDuration ),
                dataset5.col("word")
        ).count().orderBy("window");*/


        /*
        //多列转换成单列
        Dataset<String> lineDataset = rowDataset.map((MapFunction<Row, String>) x -> {
			String str = x.toString();
			return str.substring(1,str.length()-1).replace(',','|');
		}, Encoders.STRING());
		*/

        /*
            删除重复数据
        // Without watermark using guid column
        rowDataset.dropDuplicates("guid");

// With watermark using guid and eventTime columns
        rowDataset
                .withWatermark("eventTime", "10 seconds")
                .dropDuplicates("guid", "eventTime");*/

       /* StreamingQuery query = rowDataset
                //.select(rowDataset.col("username"))
                .writeStream()
                //.format("parquet")
                .format("text")
                .option("checkpointLocation", ConfigurationManager.getProperty(Constants.HDFS_SAVE_PATH))
                .option("path", ConfigurationManager.getProperty(Constants.HDFS_SAVE_PATH))
                .start();*/
        StreamingQuery query = dataset5.writeStream()
                .outputMode("append")
                .format("console")
                .option("truncate", "false")
                .start();

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }
}
